package com.ruyuan.eshop.push.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruyuan.eshop.common.concurrent.SafeThreadPool;
import com.ruyuan.eshop.common.constants.RocketMqConstant;
import com.ruyuan.eshop.common.core.JsonResult;
import com.ruyuan.eshop.common.exception.BaseBizException;
import com.ruyuan.eshop.common.message.PlatformMessagePushMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionConditionUserBucketMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionUserBucketMessage;
import com.ruyuan.eshop.common.utils.DateUtil;
import com.ruyuan.eshop.common.utils.JsonUtil;
import com.ruyuan.eshop.membership.api.AccountApi;
import com.ruyuan.eshop.membership.domain.dto.MembershipFilterDTO;
import com.ruyuan.eshop.membership.domain.dto.MembershipAccountDTO;
import com.ruyuan.eshop.persona.api.PersonaApi;
import com.ruyuan.eshop.persona.domain.dto.PersonaFilterConditionDTO;
import com.ruyuan.eshop.push.converter.ConditionConverter;
import com.ruyuan.eshop.push.converter.MessagePushConverter;
import com.ruyuan.eshop.push.dao.MessagePushCrontabDAO;
import com.ruyuan.eshop.push.dao.MessagePushDAO;
import com.ruyuan.eshop.push.domain.dto.PushMessageDTO;
import com.ruyuan.eshop.push.domain.dto.QueryMessageDTO;
import com.ruyuan.eshop.push.domain.dto.SaveOrUpdateMessageDTO;
import com.ruyuan.eshop.push.domain.dto.SendMessageDTO;
import com.ruyuan.eshop.push.domain.entity.MessagePushCrontabDO;
import com.ruyuan.eshop.push.domain.entity.MessagePushDO;
import com.ruyuan.eshop.push.domain.request.QueryMessageRequest;
import com.ruyuan.eshop.push.domain.request.SaveOrUpdateMessageRequest;
import com.ruyuan.eshop.push.domain.request.SendMessageByAccountRequest;
import com.ruyuan.eshop.push.enums.PushTypeEnum;
import com.ruyuan.eshop.push.mq.producer.DefaultProducer;
import com.ruyuan.eshop.push.service.MessagePushService;
import com.ruyuan.eshop.push.splitter.ListSplitter;
import io.jsonwebtoken.lang.Collections;
import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ruyuan.eshop.common.constants.BatchSizeConstant.MESSAGE_BATCH_SIZE;
import static com.ruyuan.eshop.common.constants.BatchSizeConstant.USER_BUCKET_SIZE;

/**
 * @author zhonghuashishan
 */
@Service
@Slf4j
public class MessagePushServiceImpl implements MessagePushService {

    @Autowired
    private MessagePushDAO messagePushDAO;

    @Autowired
    private MessagePushCrontabDAO messagePushCrontabDAO;

    @Autowired
    private MessagePushConverter messagePushConverter;

    @Autowired
    private DefaultProducer defaultProducer;

    @DubboReference(version = "1.0.0")
    private PersonaApi personaApi;

    /**
     * 选人条件转换器
     */
    @Autowired
    private ConditionConverter conditionConverter;

    /**
     * 发送消息共用的线程池
     */
    @Autowired
    @Qualifier("sharedSendMsgThreadPool")
    private SafeThreadPool sharedSendMsgThreadPool;


    @Transactional(rollbackFor = Exception.class)
    @Override
    public SaveOrUpdateMessageDTO
    saveOrUpdateMessage(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) {

        SaveOrUpdateMessageDTO saveOrUpdateMessageDTO = new SaveOrUpdateMessageDTO();

        // 构建消息实体
        MessagePushDO messagePushDO = buildMessagePushDO(saveOrUpdateMessageRequest);

        boolean saveFlag = messagePushDAO.save(messagePushDO);

        // 定时发送消息
        if (Objects.equals(saveOrUpdateMessageRequest.getPushType(), PushTypeEnum.DELAY.getCode())) {
            List<MessagePushCrontabDO> messagePushCrontabDOS = generateMessagePushCrontab(saveOrUpdateMessageRequest);

            if (!Collections.isEmpty(messagePushCrontabDOS)){
                messagePushCrontabDAO.saveBatch(messagePushCrontabDOS);
            }

            saveOrUpdateMessageDTO.setSuccess(saveFlag);
            return saveOrUpdateMessageDTO;
        }

        // 构建推送消息DTO
        PushMessageDTO pushMessageDTO = PushMessageDTO.builder()
                .mainMessage(saveOrUpdateMessageRequest.getMainMessage())
                .message(saveOrUpdateMessageRequest.getMessage())
                .informType(saveOrUpdateMessageRequest.getInformType())
                .build();

        // 推送消息
        pushMessages(pushMessageDTO, saveOrUpdateMessageRequest.getMembershipFilterDTO());

        saveOrUpdateMessageDTO.setSuccess(true);
        return saveOrUpdateMessageDTO;
    }

    /**
     * 生成消息发送任务实体数据
     *
     * @param request
     * @return
     */
    private List<MessagePushCrontabDO> generateMessagePushCrontab(SaveOrUpdateMessageRequest request) {
        Integer sendPeriodCount = request.getSendPeriodCount();
        List<MessagePushCrontabDO> messagePushCrontabDOS = new ArrayList<>();
        if (sendPeriodCount == 1) {
            // 周期内只发送一次，直接使用startTime作为定时任务的执行时间
            MessagePushCrontabDO messagePushCrontab = buildMessagePushCrontabDO(request, 1, request.getPushStartTime());
            messagePushCrontabDOS.add(messagePushCrontab);
        } else {
            LocalDateTime startTime = DateUtil.convertLocalDateTime(request.getPushStartTime());
            LocalDateTime endTime = DateUtil.convertLocalDateTime(request.getPushEndTime());

            // 开始时间和结束时间相隔分钟数
            Long minutes = DateUtil.betweenMinutes(startTime, endTime);

            // 相邻执行周期间隔分钟数
            long periodMinutes = minutes / (sendPeriodCount - 1);

            for (int i = 1; i <= sendPeriodCount; i++) {
                /*
                 * 任务执行时间计算逻辑
                 * 从起始时间开始，作为第一次
                 * 后面每次间隔 periodMinutes 执行一次
                 * 最后一次使用结束时间
                 */
                MessagePushCrontabDO messagePushCrontab;
                if (i == sendPeriodCount){
                    messagePushCrontab = buildMessagePushCrontabDO(request, i, request.getPushEndTime());
                } else {
                    LocalDateTime crontabTime = startTime.plusMinutes(periodMinutes * (i - 1));
                    messagePushCrontab = buildMessagePushCrontabDO(request, i, DateUtil.convertDate(crontabTime));
                }
                messagePushCrontabDOS.add(messagePushCrontab);
            }
        }
        return messagePushCrontabDOS;
    }

    /**
     * 构建消息发送任务实体
     * @param request
     * @param periodNumber
     * @param crontabTime
     * @return
     */
    private MessagePushCrontabDO buildMessagePushCrontabDO(SaveOrUpdateMessageRequest request,
                                                           int periodNumber, Date crontabTime) {
        MessagePushCrontabDO messagePushCrontabDO = messagePushConverter.convertMessageCrontabDO(request);
        messagePushCrontabDO.setFilterCondition(JsonUtil.object2Json(request.getMembershipFilterDTO()));
        messagePushCrontabDO.setCrontabTime(crontabTime);
        messagePushCrontabDO.setPeriodSendNumber(periodNumber);
        return messagePushCrontabDO;
    }

    /**
     * 构建消息实体
     *
     * @param saveOrUpdateMessageRequest
     * @return
     */
    private MessagePushDO buildMessagePushDO(SaveOrUpdateMessageRequest saveOrUpdateMessageRequest) {
        MessagePushDO messagePushDO = messagePushConverter.convertMessageDO(saveOrUpdateMessageRequest);
        messagePushDO.setFilterCondition(JsonUtil.object2Json(saveOrUpdateMessageRequest.getMembershipFilterDTO()));
        return messagePushDO;
    }

    @Override
    public List<QueryMessageDTO> queryMessageByCondition(QueryMessageRequest queryMessageRequest) {
        MessagePushDO messagePushDO = messagePushConverter.requestToEntity(queryMessageRequest);

        QueryWrapper<MessagePushDO> wrapper = new QueryWrapper<>();
        wrapper.setEntity(messagePushDO);

        return messagePushConverter.listEntityToDTO(messagePushDAO.list(wrapper));
    }

    @Transactional(rollbackFor = Exception.class)
    @Override
    public SendMessageDTO sendMessageByAccount(SendMessageByAccountRequest request) {
        MessagePushDO messagePushDO = messagePushConverter.convertMessageDO(request);
        messagePushDAO.save(messagePushDO);

        PlatformMessagePushMessage platformMessagePushMessage = buildPlatformMessagePushMessage(request);
        String msgJson = JsonUtil.object2Json(platformMessagePushMessage);
        defaultProducer.sendMessage(RocketMqConstant.PLATFORM_MESSAGE_SEND_TOPIC, msgJson, "平台消息推送消息");

        SendMessageDTO sendMessageDTO = new SendMessageDTO();
        sendMessageDTO.setSuccess(true);
        return sendMessageDTO;
    }


    /**
     * 按条件推送消息：推送逻辑使用分片逻辑
     * @param pushMessageDTO
     * @param membershipFilterDTO
     */
    @Override
    public void pushMessages(PushMessageDTO pushMessageDTO, MembershipFilterDTO membershipFilterDTO) {

        // 1.获取当前条件下的count值
        PersonaFilterConditionDTO conditionDTO = conditionConverter
                .convertFilterCondition(membershipFilterDTO);
        JsonResult<Integer> countResult = personaApi.countByCondition(conditionDTO);
        if (!countResult.getSuccess()) {
            throw new BaseBizException(countResult.getErrorCode(), countResult.getErrorMessage());
        }
        // 2.根据count值分片
        Integer count = countResult.getData();

        // 3、分成m个分片，每个分片中包含：（1）分片id（2）用户个数，
        // 例：maxUserId = 100w; userBucketSize=1000
        // userBucket1 = [1, 1000)
        // userBucket2 = [2, 1000)
        // userBucket2 = [n, 756)  最后一个分片可能数量不足1000
        // userBucketCount = 1000
        Map<Integer, Integer> userBuckets = new LinkedHashMap<>();
        AtomicBoolean flagRef = new AtomicBoolean(true);
        Integer shardId = 1;
        while (flagRef.get()) {
            if (USER_BUCKET_SIZE > count) {
                userBuckets.put(shardId, USER_BUCKET_SIZE);
                flagRef.compareAndSet(true, false);
            }
            userBuckets.put(shardId, USER_BUCKET_SIZE);
            shardId += 1;
            count -= USER_BUCKET_SIZE;
        }

        // 4、批量发送消息
        // 例：userBucketCount = 1000; messageBatchSize = 100
        List<String> messages = new ArrayList<>();
        PlatformPromotionConditionUserBucketMessage message =
                PlatformPromotionConditionUserBucketMessage.builder()
                .personaFilterCondition(JSON.toJSONString(conditionDTO))
                .build();
        for (Map.Entry<Integer, Integer> userBucket : userBuckets.entrySet()) {
            message.setShardId(userBucket.getKey());
            message.setBucketSize(userBucket.getValue());
            String jsonMessage = JsonUtil.object2Json(message);
            messages.add(jsonMessage);
        }
        log.info("本次推送消息数量,{}",messages.size());
        ListSplitter splitter = new ListSplitter(messages, MESSAGE_BATCH_SIZE);
        while(splitter.hasNext()){
            List<String> sendBatch = splitter.next();
            log.info("本次批次消息数量,{}",sendBatch.size());
            sharedSendMsgThreadPool.execute(() -> {
                defaultProducer.sendMessages(RocketMqConstant.PLATFORM_CONDITION_COUPON_SEND_USER_BUCKET_TOPIC, sendBatch, "部分用户优惠活动用户桶消息");
            });
        }
    }

    private PlatformMessagePushMessage buildPlatformMessagePushMessage(SendMessageByAccountRequest request) {
        PlatformMessagePushMessage platformMessagePushMessage = PlatformMessagePushMessage.builder()
                .mainMessage(request.getMainMessage())
                .message(request.getMessage())
                .informType(request.getInformType())
                .userAccountId(request.getAccountId())
                .build();
        return platformMessagePushMessage;
    }
}
